基于Linux共享内存的数据分发DDS

您所在的位置:网站首页 数据分发服务 pan 基于Linux共享内存的数据分发DDS

基于Linux共享内存的数据分发DDS

2024-07-10 07:00| 来源: 网络整理| 查看: 265

基于共享内存的数据分发DDS——C语言实现 一、软件功能介绍

在linux环境下用C语言开发的基于共享内存的数据分发DDS软件。采用了共享内存、多线程、读写锁以及互斥锁实现。

软件支持功能如下:

内部采用共享内存进行数据传输,支持多进程、多线程的数据发布、订阅功能。支持在数据发布端或订阅端使用队列功能,以适应不同的应用场景集成类似rostopic、rosbag命令行工具支持采用cjson、protobuf-c、结构体格式数据发布订阅数据。 二、软件接口介绍 /******************************************************************* * 函 数 名:init * 功能描述:数据分发功能初始化 * 输入参数:无 * 输出参数:无 * 返 回 值:无 ******************************************************************/ void init(void); /******************************************************************* * 函 数 名:deinit * 功能描述:数据分发功能销毁 * 输入参数:无 * 输出参数:无 * 返 回 值:无 ******************************************************************/ void deinit(void); /******************************************************************* * 函 数 名:create_node * 功能描述:创建一个节点 * 输入参数:node_name 节点名称 * 输出参数:无 * 返 回 值:节点结构体指针 ******************************************************************/ node_struct* create_node(const uint8_t node_name[NODE_MAX_LENGTH]); /******************************************************************* * 函 数 名:destroy_node * 功能描述:销毁节点 * 输入参数:node 节点结构体指针 * 输出参数:无 * 返 回 值:无 ******************************************************************/ void destroy_node(node_struct* node); /******************************************************************* * 函 数 名:show_topic_list * 功能描述:获取所有话题 * 输入参数:node 节点结构体指针 * 输出参数:topic_list 话题名数组;topic_count 话题个数 * 返 回 值:无 ******************************************************************/ void show_topic_list(node_struct* node,uint8_t topic_list[TOPIC_COUNT_MAX_SUPPORT][TOPIC_MAX_LENGTH],uint32_t* topic_count); /******************************************************************* * 函 数 名:create_pub_with_topic * 功能描述:创建一个发布者,并绑定到一个话题中 * 输入参数:node 节点结构体指针;topic_name 话题名;max_data_size 话题数据的最大长度单位字节;pub_rate 数据发布频率单位hz; * 输入参数:pub_model 发布模式;queue_max_count 队列元素最大个数 * 输出参数:无 * 返 回 值:发布结构体指针 ******************************************************************/ topic_pub_struct* create_pub_with_topic(node_struct* node,const uint8_t topic_name[TOPIC_MAX_LENGTH],const int max_data_size, float pub_rate,PUB_DATA_MODEL pub_model,uint32_t queue_max_count); /******************************************************************* * 函 数 名:pub_data * 功能描述:发布数据 * 输入参数:pub_dev 发布结构体指针;data 发布数据;data_size 发布数据大小 * 输出参数:无 * 返 回 值:成功返回0,失败返回-1 ******************************************************************/ int pub_data(topic_pub_struct* pub_dev,const uint8_t* data,const size_t data_size); /******************************************************************* * 函 数 名:destroy_pub * 功能描述:销毁发布者 * 输入参数:pub_ptr 发布结构体指针 * 输出参数:无 * 返 回 值:无 ******************************************************************/ void destroy_pub(topic_pub_struct* pub_ptr); /******************************************************************* * 函 数 名:create_sub_with_topic * 功能描述:创建一个订阅者,并绑定到一个话题中 * 输入参数:node 节点结构体指针;topic_name 话题名;max_data_size 话题数据的最大长度单位字节;data_process 数据处理回调函数; * 输入参数:sub_rate 数据订阅频率单位hz;sub_model 订阅模式;queue_max_count 队列元素最大个数 * 输出参数:无 * 返 回 值:订阅结构体指针 ******************************************************************/ topic_sub_struct* create_sub_with_topic(node_struct* node,const uint8_t topic_name[TOPIC_MAX_LENGTH],const int max_data_size, sub_callback_func data_process,float sub_rate,SUB_DATA_HANDLE_MODEL sub_model,uint32_t queue_max_count); /******************************************************************* * 函 数 名:sub_data_run * 功能描述:开始启动子线程订阅数据 * 输入参数:sub_ptr 订阅结构体指针 * 输出参数:无 * 返 回 值:成功返回0;失败返回-1 ******************************************************************/ int sub_data_run(topic_sub_struct* sub_ptr); /******************************************************************* * 函 数 名:destroy_sub * 功能描述:销毁订阅者 * 输入参数:sub_ptr 订阅结构体指针 * 输出参数:无 * 返 回 值:无 ******************************************************************/ void destroy_sub(topic_sub_struct* sub_ptr); 三、软件实现原理

软件架构

首先在共享内存上申请绑定固定key的话题管理内存块,用于存储所有正在使用的话题信息。里面将话题名与共享内存块的ID一一对应。

typedef struct { int32_t reference; //话题管理引用计数,当计数为0后表示可以释放该共享内存 mutex_struct lock; //同步锁 uint32_t topic_count; //已经存储的topic个数 topic_struct topic[TOPIC_COUNT_MAX_SUPPORT]; }topic_manage_struct; //话题管理,所有进程都可以访问

应用程序首先从话题管理共享内存块获取话题名对应的共享内存ID(如果没有则创建,并将id与话题名填入话题管理共享内存块中)。然后开始向话题名对应的共享内存块发布或者订阅数据。

每块共享内存分为数据头块和数据块,数据头块保存当前数据的信息,包括用于同步的读写锁、当前数据的大小以及数据更新标识符。

typedef struct { int32_t reference; //话题引用计数,当计数为0后表示可以释放该共享内存 mutex_struct lock; //话题信息同步锁 size_t max_data_size; //话题最大数据大小 pub与sub创建时指定的大小必须一致 rwlock_struct rwlock; //数据同步锁 size_t data_size; //话题当前数据大小 uint8_t data_has_flag; //数据标识,0 表示没有数据;每更新一次数据增1 uint32_t data_pub_cycle; //数据发布周期 }topic_header_struct; //话题数据结构体 四、数据分发命令行工具

cdds-topic

tcdds-topic list #列出当前运行的话题 cdds-topic hz topicname #计算当前话题数据的发布的平均频率

cdds-bag

cdds-bag record topicname1 topicname2 topicname3 ... #对执行的话题进行数据录制 cdds-bag player bag_file #播放录制的文件 五、测试DEMO

基于结构体数据发布订阅

/**************************************************************** 文件名称: main.c 功能描述: 基于共享内存的数据分发测试demo 创建日期: 2024-02-15 作者 : skynet 版本 : V1.0 修订记录: ***************************************************************/ #include "cshmdds.h" #include #include #include #include #include typedef struct { uint64_t timestamp; uint64_t sequence_num; uint8_t age; uint8_t name[9]; uint8_t phone_num[12]; uint8_t email[64]; }person_info_struct; static uint64_t get_time() { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec * 1000000 + tv.tv_usec; } void help_usage(void) { printf("Usage:\n"); printf("\tcdds-test pub topicname pubfreq\n"); printf("\tcdds-test sub topicname subfreq\n"); } void test_sub_callback(const uint8_t* data,const size_t data_size) { person_info_struct* person_data=(person_info_struct*)data; LOG_RECORD(LOGINFO,"sequence_num=%ld,timestamp=%ld,age=%d,name=%s,phone_num=%s,email=%s\n",person_data->sequence_num,person_data->timestamp,person_data->age,person_data->name,person_data->phone_num,person_data->email); } int main(int argc, char* argv[]) { uint8_t pub_sub_type[4]={'\0'}; uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'}; float pub_or_sub_freq=0; LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc); if(argc!=4) { help_usage(); return -1; } else { strcpy(pub_sub_type,argv[1]); strcpy(topic_name,argv[2]); pub_or_sub_freq=atof(argv[3]); LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\ndata_size=%ld\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,sizeof(person_info_struct),pub_or_sub_freq); if(pub_or_sub_freqsequence_num,test_unpack->age,test_unpack->name,test_unpack->phone_num,test_unpack->email); test_message__free_unpacked(test_unpack, NULL); } uint32_t init_message() { uint32_t total_size=0; test_msg.timestamp=get_time(); total_size+=sizeof(test_msg.timestamp); test_msg.sequence_num=0; total_size+=sizeof(test_msg.sequence_num); test_msg.age=0; total_size+=sizeof(test_msg.age); test_msg.name = (char*)calloc(1, 32); sprintf(test_msg.name,"helloworld"); total_size+=32; test_msg.phone_num = (char*)calloc(1, 12); sprintf(test_msg.phone_num,"11133327651"); total_size+=12; test_msg.email = (char*)calloc(1, 18); sprintf(test_msg.email,"[email protected]"); total_size+=18; return total_size; } int main(int argc, char* argv[]) { uint8_t pub_sub_type[4]={'\0'}; uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'}; float pub_or_sub_freq=0; LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc); if(argc!=4) { help_usage(); return -1; } else { strcpy(pub_sub_type,argv[1]); strcpy(topic_name,argv[2]); pub_or_sub_freq=atof(argv[3]); LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,pub_or_sub_freq); if(pub_or_sub_freq


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3